package com.gitee.sop.adminserver.helper;

import com.gitee.sop.adminserver.bean.ServiceInfo;
import com.gitee.sop.adminserver.bean.ServiceInstance;
import lombok.Getter;
import lombok.Setter;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.Watcher;
import org.slf4j.MDC;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
 * @author jiangj
 * @version 1.0.0
 * @ClassName ZookeeperHelper.java
 * @Description TODO
 * @createTime 2021年12月27日 11:11:00
 */
@Deprecated
public class ZookeeperHelper implements ApplicationContextAware {

    @Value("${zk.discovery.server-addr}")
    private String zkAddress;

    //public static final String SUBSCRIBE_LIST = "SUBSCRIBE-LIST";

    //private static final String      K8S_SERVICE_IP        = "K8S_SERVICE_IP";

    public static final String            SOA_SERVICE_NODE  = "/soaservices";
    private static final Set<String> subscribeServiceNames = new HashSet<>();

    private  ZkClient zkClient = null;
    private static final int                              DEFAULT_SESSION_TIMEOUT = 30 * 1000;
    private static Map<String, ServiceInfo> serviceAddress = new HashMap<>();
    //private static Map<String, String>            serviceConfig;

    //private static Set<String> registerNodeNames = new HashSet<>();

    //private static String customIp;
    //private static String podAddress;
    //private static String k8sServiceIp;
    private static Object LOCK = new Object();

    private static SOPConfigListener sopConfigListener = (new ZookeeperHelper()).new SOPConfigListener();

    private static ApplicationContext applicationContext = null;



    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        ZookeeperHelper.applicationContext = applicationContext;
    }

    @PostConstruct
    private void getZkClient(){
        if (zkClient == null) {
            synchronized (LOCK) {
                if (zkClient == null) {
                    try {
                        zkClient = new ZkClient(zkAddress, DEFAULT_SESSION_TIMEOUT, 5000, new SOPZkSerializer());
                    }catch (Exception e){
                        String v = "";
                    }
                }
            }
        }

        initSoa();
        //return zkClient;
    }

    /**
     * 初始化 SOAService
     * @author Chenjw
     * @since 2018/6/13
     **/
    private  void initSoa() {
        //Map<String, String> env = System.getenv();
        /*
        if (!StringHelper.isNullOrEmpty(env.get(K8S_SERVICE_IP))) {
            k8sServiceIp = env.get(K8S_SERVICE_IP);
            //log.info("get k8s service ip in environment variable:{}", k8sServiceIp);
        }*/

        if (zkClient.exists(SOA_SERVICE_NODE)) {
            zkClient.subscribeChildChanges(SOA_SERVICE_NODE, sopConfigListener);
            List<String> yardList = zkClient.getChildren(SOA_SERVICE_NODE);
            initSoaChildren(yardList);
        } else {
            //log.error("soaservices node was not found!");
        }
    }



    /**
     * 获取所属环境的服务可用IP（轮询）
     * @param serviceName 服务名称
     * @return 可用IP，如果没有可用IP则返回null
     */
    public  String getServiceAddress(String serviceName) {
        // 不存在该服务节点，则初始化并且订阅
        /*
        if (!existService(serviceName)) {
            if (zkClient.exists(SwjConfig.SOA_SERVICE_NODE)) {
                List<String> yardList = zkClient.getChildren(SwjConfig.SOA_SERVICE_NODE);
                if (!yardList.contains(serviceName)) {
                    return null;
                }
                addSubscribeServiceName(serviceName);
                initSoaChildren(yardList);
            } else {
                log.error("soaservices node was not found!");
                return null;
            }
        }*/

        //if(true){
        //if (type == null) {
            // 优先取podIp节点，再取serviceIp节点
            ServiceInfo soaServiceAddress = serviceAddress.get(serviceName);
            String ip = getNode(soaServiceAddress,NodeType.POD);
            if (!org.springframework.util.StringUtils.isEmpty(ip)) {
                return ip;
                //return serviceAddress.get(serviceName).getNode(NodeType.POD);
            }

            return getNode(soaServiceAddress,NodeType.K8S);
        //}
        //return serviceAddress.get(serviceName).getNode(type);
    }

    public static boolean existNode(ServiceInfo serviceInfo,NodeType nodeType,String ip){
        List<ServiceInstance> serviceInstanceList = getNodeList(serviceInfo,nodeType);
        if (serviceInfo==null || serviceInstanceList==null || serviceInstanceList.size()<=0) {
            return false;
        }

        serviceInstanceList = serviceInstanceList.stream().filter(i->{
            return i.equals(new ServiceInstance(ip));
        }).collect(Collectors.toList());

        if(serviceInstanceList!=null && serviceInstanceList.size()>0){
            return true;
        }
        return false;
    }

    private static List<ServiceInstance> getNodeList(ServiceInfo serviceInfo,NodeType nodeType){

        if(serviceInfo!=null && serviceInfo.getInstances()!=null && serviceInfo.getInstances().size()>0){

            if(nodeType==null){
                return serviceInfo.getInstances();
            }
            List<ServiceInstance> serviceInstanceList = serviceInfo.getInstances().stream().filter(i->{
                if(i.getMetadata()!=null && i.getMetadata().containsKey("nodeType")){
                    if(i.getMetadata().get("nodeType").equals(nodeType.name())){
                        return true;
                    }

                }
                return false;
            }).collect(Collectors.toList());

            return serviceInstanceList;
        }

        return null;
    }

    private static String getNode(ServiceInfo serviceInfo,NodeType nodeType){

        List<ServiceInstance> serviceInstanceList = getNodeList(serviceInfo,nodeType);
        if(serviceInstanceList!=null && serviceInstanceList.size()>0){

            if (serviceInstanceList.size() == 1) {
                return serviceInstanceList.get(0).getIp();
            } else {

                return serviceInstanceList.get(ThreadLocalRandom.current().nextInt(serviceInstanceList.size())).getIp();
            }
        }

        return "";

    }



    public  Map<String, ServiceInfo> getServiceAddresses() {
        return Collections.unmodifiableMap(serviceAddress);
    }

    /**
     * 重置所有SOA服务提供者信息
     * @author liuhf
     * @since 2018/8/4
     */
    private static void reset() {
        for (String key : serviceAddress.keySet()) {
            resetService(key, null);
        }
        //log.info("service center has been clear");
    }



    /**
     * 重置指定服务的可用节点列表
     * @param serviceName 服务名称
     * @param soaServiceNodes 服务节点集合
     */
    private static void resetService(String serviceName, Map<String, List<ServiceInstance>> soaServiceNodes) {

        if (!serviceAddress.containsKey(serviceName)) {
            serviceAddress.put(serviceName, new ServiceInfo());
        }

        ServiceInfo service = serviceAddress.get(serviceName);
        service.setServiceId(serviceName);
        //service.lock();
        //String oldService = service.toString();

        List<ServiceInstance> instances = new ArrayList<>();

        if(soaServiceNodes!=null && soaServiceNodes.size()>0){
            soaServiceNodes.forEach(new BiConsumer<String, List<ServiceInstance>>() {
                @Override
                public void accept(String s, List<ServiceInstance> serviceInstances) {
                    instances.addAll(serviceInstances);
                }
            });
        }

        try {
            //service.clear();
            service.setInstances(instances);
        } catch (Exception e) {
            //log.error("reset service [{}] error", serviceName, e);
        }
        //service.unLock();

        // 节点内容无变化时不打印节点变化日志
        //if (oldService.equals(service.toString())) return;
        //log.info("get service-ip map from zk, serviceName:[{}], serviceIp:[{}]", serviceName, service.toString());
    }

    private  void initSoaChildren(List<String> children) {
        for (String yard : children) {
            /*
            if (!"ALL".equalsIgnoreCase(SwjConfig.get(SUBSCRIBE_LIST)) && !subscribeServiceNames.contains(yard)) {
                continue;
            }
            if (SOAServiceCenter.existService(yard)) {
                continue;
            }*/
            if(serviceAddress.containsKey(yard)){
                continue;
            }
            zkClient.subscribeChildChanges(getZKPath(SOA_SERVICE_NODE, yard), sopConfigListener);
            initSoaChildrenInner(yard);
            //log.debug("init and listener soa service: {}", yard);
        }
    }

    public static void removeSoaNode(String serviceName, NodeType type, String ip) {

        ServiceInfo service = serviceAddress.get(serviceName);
        if(existNode(service,type,ip)){
            service.getInstances().remove(new ServiceInstance(ip));
        }


    }

    public static void addSoaNode(String serviceName, NodeType type, String ip) {
        ServiceInfo service = serviceAddress.get(serviceName);
        if (existNode(service,type,ip)) {
            return;
        }
        /*
        service.lock();
        try {
            service.add(type, ip);
        } catch (Exception e) {
            log.error("add service [{}-{}-{}] error", serviceName, type, ip, e);
        }
        service.unLock();
        log.info("add service-ip:[{}] from serviceName:[{}], current serviceIps:[{}]", ip, serviceName, service.toString());*/

        ServiceInstance serviceInstance = new ServiceInstance();
        serviceInstance.setIp(ip);
        Map<String,String> metadata = new HashMap<>();
        metadata.put("nodeType",type.name());
        serviceInstance.setMetadata(metadata);
        service.getInstances().add(serviceInstance);
    }




    private  void initSoaChildrenInner(String appName) {
        List<String> nodes = zkClient.getChildren(getZKPath(SOA_SERVICE_NODE, appName));
        Map<String, List<ServiceInstance>> soaServiceNodes = new HashMap<>();
        List<ServiceInstance> serviceIpList = new ArrayList<>();
        List<ServiceInstance> podIpList = new ArrayList<>();
        List<ServiceInstance> customList = new ArrayList<>();
        if(nodes!=null && nodes.size()>0){
        //if (!ListHelper.isNullOrEmpty(nodes)) {
            for (String node : nodes) {
                String path = getZKPath(SOA_SERVICE_NODE, appName, node);
                if (!zkClient.exists(path)) {
                    continue;
                }
                String value = zkClient.readData(path);
                // 判断节点是否可用
                NodeProperties nodeProperties = new NodeProperties().parse(value);
                if (!nodeProperties.isEnabled()) {
                    continue;
                }

                //ServiceInstance soaServiceNode = new ServiceInstance(node, nodeProperties);
                ServiceInstance soaServiceNode = new ServiceInstance();

                try {
                    if(node.contains(",")){
                        soaServiceNode.setIp(node);

                    } else if (node.contains(":")) {
                        String[] nodeInfo = node.split(":");
                        if(nodeInfo.length==2) {
                            soaServiceNode.setIp(nodeInfo[0]);
                            soaServiceNode.setPort(new Integer(nodeInfo[1]));
                        }else{
                            soaServiceNode.setIp(node);
                        }
                    } else {
                        soaServiceNode.setIp(node);
                    }
                }catch (Exception e){
                    String v = "";
                }
                soaServiceNode.setStatus("UP");


                // k8s service ip节点 检查是否含有pod 有则添加
                if (NodeType.K8S == nodeProperties.getType()) {
                    zkClient.subscribeChildChanges(path, sopConfigListener);
                    zkClient.subscribeDataChanges(path, sopConfigListener);
                    List list = zkClient.getChildren(path);
                    if(list!=null && list.size()>0 && !serviceIpList.contains(soaServiceNode)){
                    //if (!ListHelper.isNullOrEmpty(list) && !serviceIpList.contains(soaServiceNode)) {

                        Map<String,String> metadata = new HashMap<>();
                        metadata.put("nodeType",NodeType.K8S.name());
                        soaServiceNode.setMetadata(metadata);
                        serviceIpList.add(soaServiceNode);
                        //log.debug("find soa service k8sServiceIp node: {}", path);
                    }
                }
                // pod节点
                else if (NodeType.POD == nodeProperties.getType()) {
                    zkClient.subscribeDataChanges(path, sopConfigListener);
                    if (!podIpList.contains(soaServiceNode)) {
                        Map<String,String> metadata = new HashMap<>();
                        metadata.put("nodeType",NodeType.POD.name());
                        soaServiceNode.setMetadata(metadata);
                        podIpList.add(soaServiceNode);
                        //log.debug("find soa service podIp node: {}", path);
                    }
                }
                // 自定义节点
                else {
                    if (!customList.contains(soaServiceNode)) {
                        Map<String,String> metadata = new HashMap<>();
                        metadata.put("nodeType",NodeType.CUSTOM.name());
                        soaServiceNode.setMetadata(metadata);
                        customList.add(soaServiceNode);
                        //log.debug("find soa service customIp node: {}", path);
                    }
                }
            }
        }

        if (serviceIpList.size()>0) {
            soaServiceNodes.put(NodeType.K8S.toString(), serviceIpList);
        }
        if (podIpList.size()>0) {
            soaServiceNodes.put(NodeType.POD.toString(), podIpList);
        }
        if (customList.size()>0) {
            soaServiceNodes.put(NodeType.CUSTOM.toString(), customList);
        }
        if(soaServiceNodes.values().size()>0) {
            this.resetService(appName, soaServiceNodes);
        }
    }

    public  void reloadSoaService(String parentPath, List<String> children) {
        if (parentPath.equals(SOA_SERVICE_NODE)) {
            if(children!=null && children.size()>0){
            //if (!ListHelper.isNullOrEmpty(children)) {
                initSoaChildren(children);
            } else {
                reset();
            }
        } else {
            String[] temp = parentPath.split("/");
            String key = temp[2];
            initSoaChildrenInner(key);
        }
    }


    private static String getZKPath(String... nodes) {
        StringBuilder result = new StringBuilder();
        for (String node : nodes) {
            if (node.startsWith("/")) result.append(node);
            else result.append("/").append(node);
        }
        return result.toString();
    }


    class SOPZkSerializer implements ZkSerializer {
        @Override
        public byte[] serialize(Object data) throws ZkMarshallingError {
            try {
                return String.valueOf(data).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
        }

        @Override
        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            try {
                return new String(bytes, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
        }
    }

    class SOPConfigListener implements IZkDataListener, IZkChildListener, IZkStateListener {

        @Override
        public void handleChildChange(String parentPath, List<String> children) {
            //MDC.put(Constant.REQ_ID, TraceIdHelper.generate());
            try {
                if (parentPath.contains(SOA_SERVICE_NODE)) {
                    //log.debug("zk reloadSoaService due to childChange from {}", parentPath);

                    ZookeeperHelper.applicationContext.getBean(ZookeeperHelper.class).reloadSoaService(parentPath, children);
                    //this.reloadSoaService(parentPath, children);
                }
            } catch (Exception ex) {
                //log.error("handleChildChange", ex);
            }
            //MDC.clear();
        }

        @Override
        public void handleStateChanged(Watcher.Event.KeeperState state) {
            /*
            if (state.equals(Watcher.Event.KeeperState.SyncConnected)) {
                if (Boolean.parseBoolean(SwjConfig.getIsRegisterApi())) {
                    SOAServiceCenter.registerServiceAddress();
                }
            }*/
            //log.info("zk connect state:{}", state.toString());
        }

        @Override
        public void handleNewSession() {
            //log.info("handleNewSession");
        }

        @Override
        public void handleSessionEstablishmentError(Throwable error) {
            //log.info("handleSessionEstablishmentError");
        }

        @Override
        public void handleDataChange(String dataPath, Object data) {
            //MDC.put(Constant.REQ_ID, TraceIdHelper.generate());
            try {
                if (dataPath.contains(SOA_SERVICE_NODE)) {
                    //log.debug("handleDataChange, dataPath: {}, value: {}", dataPath, data);
                    String[] array = dataPath.split("/");
                    // 节点格式：/soaservices/appName/ipNode
                    if (array.length < 4) {
                        //log.error("handleDataChange, dataPath [{}] is invalid", dataPath);
                        return;
                    }
                    if (data==null) {
                        return;
                    }
                    NodeProperties nodeProperties = new NodeProperties().parse(data.toString());
                    // k8s节点需要有子节点才添加
                    if (nodeProperties.isEnabled()) {
                        if (NodeType.K8S == nodeProperties.getType() && zkClient!=null && CollectionUtils.isEmpty(zkClient.getChildren(dataPath))) {
                            ZookeeperHelper.applicationContext.getBean(ZookeeperHelper.class).removeSoaNode(array[2], nodeProperties.getType(), array[3]);
                            return;
                        }
                        ZookeeperHelper.applicationContext.getBean(ZookeeperHelper.class).addSoaNode(array[2], nodeProperties.getType(), array[3]);
                    } else {
                        ZookeeperHelper.applicationContext.getBean(ZookeeperHelper.class).removeSoaNode(array[2], nodeProperties.getType(), array[3]);
                    }
                }
            } catch (Exception ex) {
                //log.error("handleChildChange", ex);
            }
            //MDC.clear();
        }

        @Override
        public void handleDataDeleted(String dataPath) {

        }
    }

    enum NodeType {

        K8S,
        POD,
        CUSTOM

    }

    @Getter
    @Setter
    class NodeProperties {

        private static final String SPLIT = "&";

        private NodeType type = NodeType.CUSTOM;

        private boolean enabled = true;

        public NodeProperties() {
        }

        public NodeProperties(NodeType type) {
            this.type = type;
        }

        public NodeProperties parse(String value) {
            NodeProperties nodeProperties = new NodeProperties();
            if (StringUtils.isNotBlank(value)) {
                String[] array = value.split(SPLIT);
                // 长度为1，说明是旧的数据格式，只存了节点类型
                if (array.length == 1) {
                    nodeProperties.setType(NodeType.valueOf(value.toUpperCase()));
                    return nodeProperties;
                }
                for (String unit : array) {
                    String[] keyValue = unit.split("=");
                    assign(nodeProperties, keyValue[0], keyValue[1]);
                }
            }
            return nodeProperties;
        }

        private void assign(NodeProperties nodeProperties, String key, String value) {
            switch (key) {
                case "type":
                    nodeProperties.setType(NodeType.valueOf(value.toUpperCase()));
                    break;
                case "enabled":
                    nodeProperties.setEnabled(Boolean.parseBoolean(value));
                    break;
                default:
                    throw new IllegalStateException("unknown node property: " + key);
            }
        }

    }
}
